package com.bw.gmall.app.dwd;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.bw.gmall.utils.MyKafkaUtil;
import com.bw.gmall.utils.MysqlUtil;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.util.Collector;

public class DwdSkuInfoCoupon {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<String> ds = MysqlUtil.cdcMysql(env, "gmall", "sku_info");

        SingleOutputStreamOperator<JSONObject> jsonDS = ds.flatMap(new FlatMapFunction<String, JSONObject>() {
            @Override
            public void flatMap(String value, Collector<JSONObject> out) throws Exception {
                try {
                    JSONObject jsonObject = JSON.parseObject(value);
                    out.collect(jsonObject);
                } catch (Exception e) {
                    System.out.println("脏数据" + value);
                }
            }
        });
        jsonDS.print();
        FlinkKafkaProducer<String> dwd_sku_info_add = MyKafkaUtil.getFlinkKafkaProducer("dwd_sku_info_add");
        jsonDS.map(a->a.toJSONString()).addSink(dwd_sku_info_add);

        env.execute();
    }
}
